Fixed OutOfMemoryError with CPU scheduler in recursive mode.#643
Fixed OutOfMemoryError with CPU scheduler in recursive mode.#643akarnokd wants to merge 1 commit intoReactiveX:masterfrom
Conversation
|
RxJava-pull-requests #573 SUCCESS |
That sounds like a simple issue, as an internal scheduler once complete should have no reference to anything else. If I have not yet grokked why this PR is changing so much when recursion already works fine on Schedulers. |
|
Here is infinite recursion happening inside protected static void testRecursion() throws InterruptedException {
Observable<Long> source = Observable.interval(1, TimeUnit.MILLISECONDS).observeOn(Schedulers.newThread());
final CountDownLatch latch = new CountDownLatch(1);
source.subscribe(new Action1<Long>() {
@Override
public void call(Long l) {
if (l % 1000 == 0) {
System.out.println(l);
}
}
});
// wait indefinitely
latch.await();
} |
|
Recursion only works when the observeOn is used, but many operators use schedulers directly which are not reentrant so they will chain up the subscriptions. |
There was a problem hiding this comment.
This feels wrong that we have a "helper" that schedulers extend from. That implies that the Scheduler interface is wrong.
/cc @headinthebox
There was a problem hiding this comment.
I needed this as the scheduler working with a reentrant scheduler should provide some core scheduling operations without creating chained subscriptions through the standard API. But ExecutorScheduler could implement this privately so it doesn't show up in the signature.
There was a problem hiding this comment.
If the default (Executor)Scheduler would schedule the stateless tasks without turning them into a recursive call, there were no need to use a helper:
/**
* Schedules an action to be executed.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
public Subscription schedule(final Action0 action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
@Override
public Subscription call(Scheduler scheduler, Void state) {
action.call();
return Subscriptions.empty();
}
});
}|
Closing: Revised in PR #648 |
|
This seems like it's a bug inside current Schedulers as recursion should work without memory leaks. Here is the histogram showing the leak: I will dig in to this and get it resolved. We do not need new subscription or scheduler types to solve this. |
Found an issue in Rx.NET regarding an out-of-memory situation due to long chained subscriptions when using recursive scheduling.
The following test program crashes or just hangs indefinitely:
The issue lies in the fact that CPU scheduler (and perhaps the others) are not really reentrant, therefore, they create new composite subscriptions whenever a recursive scheduling happens. So instead of showing a simple subscription to the outside world, a chain of subscription is extended on every recursive schedule call.
A working solution is to have a different scheduler shown to the
Func2<Scheduler, T, Subscription>than the actual CPU scheduler. This new scheduler, calledReentrantScheduler, maintains internal subscriptions which get replaced if a recursive scheduling is executed.I found three issues with this new approach and the old tests:
SchedulersTest.testRecursiveScheduler2, the logic relied on the expectation that the call on L338 is executed at least once if the outer subscription is unsubscribed. The new logic stops the schedule chain instantly, so it is very unlikely the call on L338 gets executed after this.OperationObserveOn.Observationtries to solve the underlying issue as well with its own subscription replacer logic. I haven't changed the operator as it might not work with non CPU scheduler after that.ReentrantSchehdulermaintains two subscriptions. One for the result of the schedule calls, and one for the theDiscardableActions. The former subscriptions can be swapped out without unsubscribing the previous completed schedule, but I'm not sure about theDiscardableActions; if I unsubscribe them, tests hang because it basically cancels itself and no further task is executed. This might be a conceptual error inReentrantScheduler; perhaps it should not return the entire composite on each schedule call, but rather return the content of theactionSubonly.